package com.bfxy.rocketmq.producer.transaction;

import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException {

        //新建consumer，test_tx_consumer_group_name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tx_consumer_group_name");

        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(20);
        //Const.NAMESERVER_ADDRESS
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
        //从哪里消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //做一个注册，topic就是生产端发的topic
        consumer.subscribe("test_tx_topic", "*");
        //消息消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt me = msgs.get(0);//获取第一条消息
                try {
                    String topic = me.getTopic();
                    String tag = me.getTags();
                    String keys = me.getKeys();
                    String body = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println("收到事物消息,topic:" + topic + ",tag:" + tag + ",keys:" + keys + ",body:" + body);

                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("tx consumer started...");


    }
}
